# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import itertools as it
from hysop import __DEBUG__, main_size
from hysop.constants import MemoryOrdering
from hysop.tools.transposition_states import TranspositionState
from hysop.tools.htypes import to_list, to_tuple, check_instance, first_not_None
from hysop.tools.numpywrappers import npw
from hysop.tools.decorators import debug
from hysop.topology.topology import Topology
from hysop.topology.topology_descriptor import TopologyDescriptor
from hysop.core.graph.computational_node import ComputationalGraphNode
from hysop.fields.continuous_field import ScalarField
from hysop.fields.discrete_field import DiscreteScalarField
# Debug level for topology creation
# 0: no debug logs
# 1: topo creation summary for each field
# 2: topo creation details for all discrete field requirements
TOPO_CREATION_DEBUG_LEVEL = 0
[docs]
def gprint(*args, **kwds):
level = kwds.pop("level", 2)
if TOPO_CREATION_DEBUG_LEVEL >= level:
print(*args, **kwds)
[docs]
class DiscreteFieldRequirements:
__slots__ = (
"_operator",
"_field",
"_variables",
"_topology",
"_dim",
"_min_ghosts",
"_max_ghosts",
"_can_split",
"_axes",
"_memory_order",
"_registered",
"_work_dim",
"_topology_descriptor",
"_header",
)
_registered_requirements = set()
def __init__(
self,
operator,
variables,
field,
min_ghosts=None,
max_ghosts=None,
can_split=None,
memory_order=None,
axes=None,
_register=True,
**kwds,
):
if _register:
key = (id(operator), id(variables), id(field))
if key in self._registered_requirements:
msg = "Operator {} has already registered requirements for field {} "
msg += "to variables id {}."
msg = msg.format(operator.name, field.name, id(variables))
raise RuntimeError(msg)
else:
if __DEBUG__:
msg = "Operator {} registered requirements of field {} to variables id {}."
msg = msg.format(operator.name, field.name, id(variables))
print(msg)
self._registered_requirements.update(key)
super().__init__(**kwds)
check_instance(field, ScalarField)
check_instance(operator, ComputationalGraphNode, allow_none=(not _register))
check_instance(
variables,
dict,
keys=ScalarField,
values=(Topology, TopologyDescriptor),
allow_none=(not _register),
)
self._operator = operator
self._field = field
self._work_dim = field.dim
self._variables = variables
self._topology_descriptor = variables[field] if variables else None
self._dim = field.dim
self._header = "::{}[{}] requirements::\n".format(
getattr(operator, "name", "UnknownOperator"),
getattr(field, "name", "UnknownField"),
)
self._registered = _register
self.min_ghosts = min_ghosts
self.max_ghosts = max_ghosts
self.can_split = can_split
self.memory_order = memory_order
self.axes = axes
[docs]
def as_dfr(self):
return self
[docs]
def copy(self):
return DiscreteFieldRequirements(
operator=self._operator,
variables=self._variables,
field=self._field,
min_ghosts=self._min_ghosts,
max_ghosts=self._max_ghosts,
can_split=self._can_split,
memory_order=self._memory_order,
axes=self._axes,
)
[docs]
def is_default(self):
return self == self._default()
def _default(self):
return DiscreteFieldRequirements(
self._operator, self._variables, self._field, _register=False
)
def __eq__(self, other):
eq = self.operator is other.operator
eq &= self.variables is other.variables
eq &= self.field is other.field
eq &= (self.min_ghosts == other.min_ghosts).all()
eq &= (self.max_ghosts == other.max_ghosts).all()
eq &= (self.can_split == other.can_split).all()
eq &= self.memory_order == other.memory_order
eq &= self.tstates == other.tstates
return eq
def __hash__(self):
return (
id(self.operator)
^ id(self.variables)
^ id(self.field)
^ hash(
(
to_tuple(self.min_ghosts),
to_tuple(self.max_ghosts),
self.memory_order,
self.tstates,
)
)
)
[docs]
def ghost_str(self, array):
inf = "+∞"
vals = ["" + str(x) if np.isfinite(x) else inf for x in array]
return "[{}]".format(",".join(vals)).strip()
def __str__(self):
return "{:15s} {:>10s}<=ghosts<{:<10s} memory_order={} can_split={} tstates={}".format(
"{}::{}".format(
getattr(self.operator, "name", "UnknownOperator"),
getattr(self.field, "name", "UnknownField"),
),
self.ghost_str(self.min_ghosts),
self.ghost_str(self.max_ghosts + 1),
self.memory_order,
"" + str(self.can_split.view(np.int8)),
(
"[{}]".format(",".join("" + str(ts) for ts in self.tstates))
if self.tstates
else "ANY"
),
)
[docs]
def get_axes(self):
return self._axes
[docs]
def set_axes(self, axes):
check_instance(axes, tuple, values=tuple, allow_none=True)
if axes:
if not all([len(_) == self._dim for _ in axes]):
msg = f"all given axis should be of length {self._dim}, given {axes}"
assert False, msg
self._axes = axes
[docs]
def get_tstates(self):
all_axes = self._axes
if all_axes is None:
return None
else:
return tuple(TranspositionState[self._dim](axes) for axes in all_axes)
[docs]
def get_memory_order(self):
return self._memory_order
[docs]
def set_memory_order(self, memory_order):
check_instance(memory_order, MemoryOrdering, allow_none=True)
if memory_order is None:
memory_order = MemoryOrdering.ANY
assert memory_order in (
MemoryOrdering.C_CONTIGUOUS,
MemoryOrdering.F_CONTIGUOUS,
MemoryOrdering.ANY,
), memory_order
self._memory_order = memory_order
[docs]
def get_min_ghosts(self):
return self._min_ghosts
[docs]
def set_min_ghosts(self, min_ghosts):
self._min_ghosts = np.asarray(
to_list(min_ghosts) if (min_ghosts is not None) else [0] * self.workdim
)
assert self.min_ghosts.size == self.workdim
[docs]
def get_max_ghosts(self):
return self._max_ghosts
[docs]
def set_max_ghosts(self, max_ghosts):
self._max_ghosts = np.asarray(
to_list(max_ghosts) if (max_ghosts is not None) else [np.inf] * self.workdim
)
assert self.max_ghosts.size == self.workdim
[docs]
def get_can_split(self):
return self._can_split
[docs]
def set_can_split(self, can_split):
self._can_split = np.asarray(
to_list(can_split) if (can_split is not None) else [1] * self.workdim,
dtype=np.bool_,
)
assert self.can_split.size == self.workdim
[docs]
def get_work_dim(self):
return self._work_dim
[docs]
def get_operator(self):
return self._operator
[docs]
def get_field(self):
return self._field
[docs]
def get_variables(self):
return self._variables
[docs]
def get_topology_descriptor(self):
return self._topology_descriptor
can_split = property(get_can_split, set_can_split)
min_ghosts = property(get_min_ghosts, set_min_ghosts)
max_ghosts = property(get_max_ghosts, set_max_ghosts)
axes = property(get_axes, set_axes)
tstates = property(get_tstates)
memory_order = property(get_memory_order, set_memory_order)
workdim = property(get_work_dim)
operator = property(get_operator)
field = property(get_field)
variables = property(get_variables)
topology_descriptor = property(get_topology_descriptor)
[docs]
def is_compatible_with(self, other, i=None):
assert self.field == other.field, "field mismatch."
if isinstance(other, DiscreteFieldRequirements):
others = {other}
elif isinstance(other, MultiFieldRequirements):
if self.topology_descriptor in other.requirements.keys():
others = other.requirements[self.topology_descriptor]
else:
return True
else:
msg = f"Unknown type {other.__class__}."
raise TypeError(msg)
for other in others:
assert self.workdim == other.workdim, "workdim mismatch."
assert (
self.topology_descriptor == other.topology_descriptor
), "topology_descriptor mismatch."
if (self.field.lboundaries != other.field.lboundaries).any():
if i is not None:
gprint(f" => lboundaries mismatch with subgroup {i}")
return False
if (self.field.rboundaries != other.field.rboundaries).any():
if i is not None:
gprint(f" => rboundaries mismatch with subgroup {i}")
return False
if (other.max_ghosts < self.min_ghosts).any():
if i is not None:
gprint(f" => ghosts incompatibility with subgroup {i}")
return False
if (other.min_ghosts > self.max_ghosts).any():
if i is not None:
gprint(f" => ghosts incompatibility with subgroup {i}")
return False
multiprocess = main_size > 1
if multiprocess and not (other.can_split * self.can_split).any():
if i is not None:
gprint(f" => splitting incompatibility with subgroup {i}")
return False
if i is not None:
gprint(f" => compatible with subgroup {i}")
return True
[docs]
def update_requirements(self, other):
assert self.is_compatible_with(other)
assert self.memory_order == other.memory_order
assert (self.tstates is None) or self.tstates.intersection(other.tstates)
self.min_ghosts = np.maximum(self.min_ghosts, other.min_ghosts)
self.max_ghosts = np.minimum(self.max_ghosts, other.max_ghosts)
self.can_split *= other.can_split
if self.axes:
self.axes = self.axes.intersection(other.axes) if other.axes else self.axes
else:
self.axes = other.axes
[docs]
def check_topology(self, topology=None):
topology = topology or self.variables[self.field]
check_instance(topology, Topology)
if topology.domain.dim != self.field.dim:
msg = "{} Dimension mismatch between field and topology.\n field={}d, topology={}d."
msg = msg.format(self._header, self.field.dim, topology.domain.dim)
raise RuntimeError(msg)
if (topology.grid_resolution != self.topology_descriptor.grid_resolution).any():
msg = "{} Grid resolution mismatch between requirement and topology.\n "
msg += " requirement={}\n topology={}"
msg = msg.format(
self._header,
self.topology_descriptor.grid_resolution,
topology.grid_resolution,
)
raise RuntimeError(msg)
if (
topology.global_resolution != self.topology_descriptor.global_resolution
).any():
msg = "{} Global resolution mismatch between requirement and topology.\n "
msg += " requirement={}\n topology={}"
msg = msg.format(
self._header,
self.topology_descriptor.global_resolution,
topology.global_resolution,
)
raise RuntimeError(msg)
if (topology.ghosts < self.min_ghosts).any():
msg = "{} min ghosts constraint was not met.\n min={}, actual={}."
msg = msg.format(self._header, self.min_ghosts, topology.ghosts)
raise RuntimeError(msg)
if (topology.ghosts > self.max_ghosts).any():
msg = "{} max ghosts constraint was not met.\n max={}, actual={}."
msg = msg.format(self._header, self.max_ghosts, topology.ghosts)
raise RuntimeError(msg)
[docs]
def check_discrete_topology_state(self, state):
from hysop.topology.cartesian_topology import CartesianTopologyState
check_instance(state, CartesianTopologyState)
if (
(self.memory_order is not None)
and (self.memory_order is not MemoryOrdering.ANY)
and (self.memory_order != state.memory_order)
):
msg = "{} memory_order mismatch between requirement and topology state.\n reqs={}, state={}."
msg = msg.format(self._header, self.memory_order, state.memory_order)
raise RuntimeError(msg)
if (self.tstates is not None) and (state.tstate not in self.tstates):
msg = "{} Transposition state mismatch between requirement and topology state.\n"
msg += " reqs=[{}], state={}."
msg = msg.format(
self._header, ",".join([str(x) for x in self.tstates]), state.tstate
)
raise RuntimeError(msg)
[docs]
def check_state(self, dfield):
check_instance(dfield, DiscreteScalarField)
self.check_topology(dfield.topology)
self.check_discrete_topology_state(dfield.state)
[docs]
def set_and_check_topology(self, topology):
"""
Check topology and replace a TopologyDescriptor by a Topology instance in
self.variables[self.field].
"""
assert isinstance(topology, Topology)
assert not isinstance(self.variables[self.field], Topology) or (
self.variables[self.field] == topology
)
self.check_topology(topology)
self.variables[self.field] = topology
[docs]
class MultiFieldRequirements:
__slots__ = ("field", "requirements", "built", "common_can_split")
def __init__(self, field):
self.field = field
self.requirements = {}
self.built = False
self.common_can_split = None
[docs]
def copy(self):
requirements = {k: v.copy() for (k, v) in self.requirements.items()}
obj = MultiFieldRequirements(field=self.field)
obj.built = self.built
obj.requirements = requirements
return obj
[docs]
def as_dfr(self):
# return a DiscreteFieldRequirements if there is only one requirement
if self.nrequirements() == 0:
return None
else:
assert self.nrequirements() == 1
return next(iter(tuple(self.requirements.values())[0]))
[docs]
def nrequirements(self):
return sum(len(lreqs) for lreqs in self.requirements.values())
[docs]
def update(self, *update_reqs):
for update_req in update_reqs:
if update_req is None:
continue
if isinstance(update_req, MultiFieldRequirements):
tds = update_req.requirements.keys()
reqs = update_req.requirements.values()
else:
tds = [update_req.topology_descriptor]
reqs = [[update_req]]
for td, req in zip(tds, reqs):
self.requirements.setdefault(td, set()).update(req)
[docs]
def build_topologies(self):
gprint(
f"\nMULTIFIELD_REQUIREMENTS.BUILD_TOPOLOGIES() for field {self.field.name}"
)
if self.built:
return
gprint(" 1) SPLITTING REQUIREMENTS IN COMPATIBLE SUBGROUPS:")
multi_process = tuple(self.requirements.keys())[0].mpi_params.size > 1
splitted_reqs = self._split(multi_process)
gprint(
" 2) DETERMINING COMMON CARTESIAN TOPOLOGY SPLITTING AXES (if possible):"
)
can_split = 1
for i, compatible_reqs in enumerate(splitted_reqs):
subgroup_can_split = compatible_reqs.common_can_split
can_split *= subgroup_can_split
gprint(f" *subgroup{i}.can_split = {subgroup_can_split}")
gprint(
" => Global available split directions for field {} are {}".format(
self.field.name, can_split
)
)
if can_split.any():
gprint(
" => Enforcing this configuration for Cartesian topology creation."
)
for compatible_reqs in splitted_reqs:
compatible_reqs.common_can_split = can_split
else:
gprint(" => No common splitting axes found between all subgroups.")
gprint(" 3) BUILDING TOPOLOGIES:")
all_topologies = set()
for i, compatible_reqs in enumerate(splitted_reqs):
gprint(f" *building topology for requirement group {i}")
subgroup_topologies = compatible_reqs._build_compatible_topologies()
all_topologies.update(subgroup_topologies)
gprint(
f" Summary of topologies for field {self.field.name}, subgroup {i}:"
)
for topo in subgroup_topologies:
gprint(f" *{topo.short_description()}")
gprint("", level=1)
gprint(f" Summary of topologies for field {self.field.name}:")
for topo in all_topologies:
gprint(f" *{topo.short_description()}")
gprint("", level=1)
self.built = True
[docs]
def all_compatible(self):
for topology_descriptor in self.requirements:
requirements = self.requirements[topology_descriptor]
assert len(requirements) > 0
for req0, req1 in it.combinations(requirements, 2):
if not req0.is_compatible_with(req1):
return False
return True
def _split(self, multi_process):
sub_field_requirements = []
for lreq in self.requirements.values():
for req in sorted(lreq, key=lambda x: str(x)):
gprint(f" *Requirement {req}")
ok = False
for i, multi_reqs in enumerate(sub_field_requirements):
if req.is_compatible_with(multi_reqs, i):
multi_reqs.update(req)
ok = True
break
if not ok:
gprint(
" => this requirement is not compatible with any existing requirement group, creating a new one (subgroup {}).".format(
len(sub_field_requirements)
)
)
new_group = MultiFieldRequirements(self.field)
new_group.update(req)
sub_field_requirements.append(new_group)
assert self.nrequirements() == sum(
sf.nrequirements() for sf in sub_field_requirements
)
for multi_reqs in sub_field_requirements:
for topology_descriptor, reqs in multi_reqs.requirements.items():
if isinstance(topology_descriptor, Topology):
dim = topology_descriptor.domain_dim
else:
dim = topology_descriptor.dim
can_split = npw.integer_ones(shape=(dim,))
for req in reqs:
if isinstance(req.topology_descriptor, Topology):
can_split *= req.topology_descriptor.proc_shape > 1
else:
can_split *= req.can_split
assert (not multi_process) or can_split.any()
multi_reqs.common_can_split = can_split
return sub_field_requirements
def _build_compatible_topologies(self):
assert self.all_compatible()
all_topologies = set()
for topology_descriptor, reqs in self.requirements.items():
if isinstance(topology_descriptor, Topology):
gprint(f" -Topology {topology_descriptor.short_description()}")
dim = topology_descriptor.domain_dim
known_topologies = {topology_descriptor}
else:
gprint(f" -Topology descriptor {topology_descriptor}")
dim = topology_descriptor.dim
known_topologies = set()
unknown_topologies = set()
ghosts = npw.integer_zeros(shape=(dim,))
can_split = npw.integer_ones(shape=(dim,))
for req in reqs:
if isinstance(req.topology_descriptor, Topology):
req.check_topology()
known_topologies.add(req.topology_descriptor)
else:
ghosts = np.maximum(ghosts, req.min_ghosts)
can_split *= req.can_split
unknown_topologies.add(req)
for req in unknown_topologies:
gprint(
" >choose or create topology from {} existing topologies:".format(
len(known_topologies)
),
end="",
)
topo = req.topology_descriptor.choose_or_create_topology(
known_topologies, ghosts=ghosts, cutdirs=self.common_can_split
)
if topo in known_topologies:
gprint(f" choosed existing topology {topo.pretty_tag}.")
else:
gprint(f"\n Created topology {topo.short_description()}")
known_topologies.add(topo)
req.set_and_check_topology(topo)
all_topologies.update(known_topologies)
return all_topologies
[docs]
class OperatorFieldRequirements:
__slots__ = ("_input_field_requirements", "_output_field_requirements")
def __init__(
self, input_field_requirements=None, output_field_requirements=None, **kwds
):
super().__init__(**kwds)
check_instance(
input_field_requirements,
dict,
keys=ScalarField,
values=MultiFieldRequirements,
allow_none=True,
)
self._input_field_requirements = first_not_None(input_field_requirements, {})
check_instance(
output_field_requirements,
dict,
keys=ScalarField,
values=MultiFieldRequirements,
allow_none=True,
)
self._output_field_requirements = first_not_None(output_field_requirements, {})
[docs]
def get_output_field_requirements(self):
return self._output_field_requirements
input_field_requirements = property(get_input_field_requirements)
output_field_requirements = property(get_output_field_requirements)
[docs]
def update(self, requirements):
check_instance(requirements, OperatorFieldRequirements)
self.update_inputs(requirements._input_field_requirements)
self.update_outputs(requirements._output_field_requirements)
[docs]
def update_outputs(self, output_field_requirements):
self._update_requirements(
self._output_field_requirements, output_field_requirements
)
def _update_requirements(self, self_reqs, reqs):
check_instance(
reqs,
dict,
keys=ScalarField,
values=(DiscreteFieldRequirements, MultiFieldRequirements, type(None)),
)
for field, reqs in reqs.items():
if reqs is not None:
reqs = reqs.copy()
if not isinstance(reqs, MultiFieldRequirements):
_reqs = reqs
reqs = MultiFieldRequirements(field)
reqs.update(_reqs)
if field in self_reqs:
self_reqs[field].update(reqs)
else:
self_reqs[field] = reqs
[docs]
def iter_output_requirements(self):
"""
Iterates over (field, topology_descriptor, field_requirement) for all output requirements.
"""
for field, freqs in self.output_field_requirements.items():
freqs = freqs.requirements
for td, reqs in freqs.items():
for req in reqs:
yield (field, td, req)
[docs]
def iter_requirements(self):
"""
Iterates over (is_input, field, topology_descriptor, field_requirement) for
all inputs and outputs.
"""
it0 = it.zip_longest((True,), self.iter_input_requirements())
it1 = it.zip_longest((False,), self.iter_output_requirements())
return it.chain(it0, it1)
def _get_requirement(self, field, field_requirements):
"""
Get unique requirement and topology descriptor for given field, if it exists.
This is a facility for ComputationalGraphOperators to retrieve their unique i
per field requirements.
If field is not an hysop.fields.continuous_field.ScalarField, a TypeError is raised.
If field is not known, an Attribute error is raised.
If multiple topology_descriptors or requirements are present (ie. there is no unicity),
this will raise a RuntimeError.
"""
check_instance(field, ScalarField)
if field not in field_requirements:
msg = f"No requirements found for field {field.name}."
raise AttributeError(msg)
freqs = field_requirements[field].requirements
if len(freqs.keys()) > 1:
msg = f"Multiple topology descriptors are present for field {field.name}."
raise RuntimeError(msg)
if len(freqs.keys()) == 0:
msg = f"No topology descriptors are present for field {field.name}."
raise RuntimeError(msg)
td = tuple(freqs.keys())[0]
reqs = freqs[td]
if len(reqs) > 1:
msg = f"Multiple requirements are present for field {field.name}."
raise RuntimeError(msg)
return (td, next(iter(reqs)))
[docs]
def get_output_requirement(self, field):
return self._get_requirement(field, self._output_field_requirements)
[docs]
@debug
def build_topologies(self):
fields = set(self._input_field_requirements.keys()).union(
self._output_field_requirements.keys()
)
# enforce deterministic iteration
for field in sorted(fields, key=lambda x: f"{x.name}::{x.short_description()}"):
reqs = MultiFieldRequirements(field)
reqs.update(
self._input_field_requirements.get(field, None),
self._output_field_requirements.get(field, None),
)
reqs.build_topologies()